Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T128 refactor loader #128, #122 #138

Merged
merged 28 commits into from
Aug 27, 2021
Merged

T128 refactor loader #128, #122 #138

merged 28 commits into from
Aug 27, 2021

Conversation

dolsysmith
Copy link
Contributor

@dolsysmith dolsysmith commented Aug 13, 2021

Summary

This branch contains the following changes:

  1. Upgrades Spark to v. 3.1.2
  2. Upgrades Java to v. 11.11+9
  3. Upgrades Python to 3.8 (and other dependencies as required)
  4. Upgrades Pyspark to 3.1.2
  5. spark-loader command now uses the Spark DataFrame API to 1) load Tweets into ES and 2) create extracts.

Setup

You will need to have an NFS mount shared between your primary and cluster nodes/VM's in order for this branch to work properly. See the instructions in the original issue.

Make sure that loader.docket-compose.yml is configured to build the Docker image for the Dockerfile-loader.Likewise for the spark-master and spark-worker containers in docker-compose.yml on both primary and cluster nodes.

You'll also need to update your .env file to point the DATASET_PATH variable to the shared NFS mount. (On my VM's, this is /storage/dataset_loading.)

Currently, the data extracts are written to the same directory used for loading datasets. So they will not appear in the TweetSets UI, which is still looking for them elsewhere. But I didn't want to touch tweetset_server.py in this branch, given the work Laura has been doing with the Python 3.8 upgrade.

Testing

It will be useful to load the same dataset with the regular loader and then with the Spark loader, in order to compare results in the UI.

  1. Bring up TweetSets (making sure it builds the new images for Dockerfile-spark.
  2. Create a dataset.json file in the directory with the JSON of the tweets to load.
  3. Bring up the loader container.
  4. Load a sample dataset using the regular (non-Spark) loader.
  5. Edit the dataset.json with a name to distinguish it from the previous load.
  6. Load the dataset again with the following command:
spark-submit \
 --jars elasticsearch-hadoop.jar \
 --master spark://$SPARK_MASTER_HOST:7101 \
 --py-files dist/TweetSets-2.1.0-py3.8.egg,dependencies.zip \
 --conf spark.driver.bindAddress=0.0.0.0 \
 --conf spark.driver.host=$SPARK_DRIVER_HOST \
 tweetset_loader.py spark-create /dataset/sample/json

This presumes that your sample dataset is in the storage/dataset_loading/sample directory (or whatever NFS mount is mapped to /dataset in the .ENV file). In my testing, I put the tweet JSON files and dataset.json in a json subdirectory, but that's not strictly necessary.

  1. Unfortunately, the UI will not work on this branch, due to the changes required for Python 3.8. So to test it in the UI, you should:
    a. Bring down TweetSets on both clusters.
    b. Check out the master branch or Laura's Py38 branch (t126-python-38).
    c. Remove the server image (on the primary node) to force rebuild: docker image rm ts_server-flaskrun:latest
    d. Brink TweetSets back up.

Expected Results

Elasticsearch indexing

  • Spark loader should load the data and create extracts without errors.
  • Datasets limited in TweetSets should mostly be identical. The following are expected differences:
    • More hashtags may be present in this implementation, due to more consistent use of the extended_tweet and retweeted_status elements.
    • More URL's may be present, for the same reasons as above.
    • Fewer results for (at least some) keyword searches. This is due to the exclusion of the quoted_status text fields for tweets of type quote. (This decision was made in consultation with Laura for the sake of consistency.)

Let me know if you see inconsistencies in the indexing that don't make sense with the above.

Dataset extracts

  • Spark automatically allots each type to its own subdirectory. The following should be present:
    • tweet_json
    • tweet_ids
    • tweet_csv
    • tweet_mentions/top_mentions
    • tweet_mentions/nodes
    • tweet_mentions/edges

Each should contain one or more zipped files. I've tested the CSV files against those created by twarc.json2csv (1.12.1) and documented some minor differences in the data dictionary. Feel free to test them against full extracts created in the UI, but do keep in mind that the current version of TS is using an older version of json2csv.

In my testing, Spark created far too many files for the mention extracts. I assume that is a setting that can be configured, but I haven't looked into that yet.

Performance

I am curious to hear how you experience it. I would expect this code to be faster at least for large datasets -- it was so in testing on my own laptop -- but I haven't had a chance to test a large dataset on the VM's. There may be Spark settings we can tweak to improve performance, but my impression is that for some of these, what's optimal depends on the environment, and our dev VM's are not terribly good proxies for production. We might have more success testing this aspect on tweetsets-dev.

@dolsysmith dolsysmith requested review from lwrubel and kerchner August 13, 2021 15:44
@dolsysmith dolsysmith self-assigned this Aug 13, 2021
@dolsysmith dolsysmith linked an issue Aug 13, 2021 that may be closed by this pull request
@dolsysmith dolsysmith force-pushed the t128-refactor-loader branch from 674c0af to 0739b6f Compare August 19, 2021 14:44
@dolsysmith
Copy link
Contributor Author

Updated spark_utils.py so that the original JSON (unparsed) is stored in the tweet field. This method avoids discrepancies arising from the difference between Python's and Spark's handling of null fields in the original tweet JSON. The short version is that some fields in Twitter's JSON schema are present with null values, and others can be absent entirely. Spark, being schema-based, will treat absent fields and fields present but with a null value the same. Python, on the other hand, will look for the presence of a key, and according to the logic in json2csv, sometimes it will behave differently depending on whether a key is present (but empty) or absent entirely. This change should obviate this problem, as well as preserve the original structure of the Twitter API JSON.

@dolsysmith
Copy link
Contributor Author

Updated the JSON schema file so that the full_text and retweeted_status.full_text fields are read (if present) by the loader.

spark_utils.py Outdated
'''Loads a set of JSON tweets as strings and adds a column index. We do this so that the ultimate output in the JSON extract will have the same null fields as the original.

:param spark: an initialized SparkSession object
:param path_to_dataset: a comma-separated list of JSON files to load'''
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be :param path_to_tweets:?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- fixed.

@dolsysmith
Copy link
Contributor Author

Updated branch as follows:

  1. Writing to ES (tweetset_loader.py, line 354) now uses only the subset of fields we want to index.
  2. elasticsearch-hadoop configuration updated to exclude tweet_id field from indexing.
  3. JSON object of tweet now joined to parsed tweet by unique tweet id. (I was joining on a row number previously, but such numbering is evidently not determinate when loading from multiple files. In other words, loading the same dataset twice can produce different row numberings if the data are distributed across multiple files.)

@dolsysmith
Copy link
Contributor Author

dolsysmith commented Aug 25, 2021

New approach: we load the JSON-L as an RDD, then convert that to a DataFrame, allowing us to preserve the original string representation of the JSON as a separate column and obviating the need for a join (which creates problems on smaller datasets). This approach seems stable, but performance has taken a hit (relative to previous implementations) that is evident on larger datasets. Loading 20 GB with the new implementation (not counting the time to create extracts) took ~35 min, vs. 20 min using the implementation currently in production.

@lwrubel
Copy link
Collaborator

lwrubel commented Aug 25, 2021

Is that comparison against the previous implementations of Spark 3? Or compared to TweetSets 2.1?

@lwrubel
Copy link
Collaborator

lwrubel commented Aug 25, 2021

Oh never mind, I see you said compared to production. Sorry!

@dolsysmith
Copy link
Contributor Author

This versions uses the RDD API for loading to TweetSets (in order to preserve the original JSON as is) and the DataFrame API to create the extracts. Performance is comparable to what's in production for loading and significantly improved for creating extracts.

The Spark SQL code includes fields that we use for indexing in Elasticsearch; these are dropped when creating the CSV. I will leave them there for now (I don't think their presence really impacts performance) with an eye toward a future release where we no longer need to load the full JSON into Elasticsearch. At that point, we can use the DataFrame API for everything (which should improve performance further).

@dolsysmith dolsysmith merged commit 5229297 into master Aug 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor tweetset_loader.py to use Spark DataFrame API
2 participants